#include "config.h"

#include <sys/statvfs.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <errno.h>
#include <sys/statfs.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "cluster.h"
#include "squeue.h"
#include "nodectl.h"
#include "token_bucket.h"
#include "cache.h"
#include "net_global.h"
#include "chunk_bh.h"
#include "core.h"
#include "../storage/md_parent.h"
#include "../controller/pool_ctl.h"
#include "../controller/volume_ctl.h"
#include "rate_probe.h"
#include "dbg.h"

#define CHUNK_OP_NULL                       0x00000000
#define CHUNK_OP_SYNC                       0x00000001
#define CHUNK_OP_LOCALIZE                   0x00000002

/* for task queue */
typedef enum {
        CHUNK_BH_SYNC,
        CHUNK_BH_LOCALIZE,
        CHUNK_BH_TYPE_MAX,
} type_t;

#define CHUNK_BH_QUEUE 2
#define CHUNK_BH_CAPACITY (100*1000)

#define CHUNK_BH_LOCALIZE_DF_RATE       1024
#define CHUNK_BH_LOCALIZE_RATE          "localize/fill_rate"
#define CHUNK_BH_INFO                   "localize/info"

typedef struct {
        worker_handler_t sem;
        squeue_t queue;
        uint64_t task_count;
        uint64_t task_type[CHUNK_BH_TYPE_MAX];
} queue_t;

typedef struct {
        queue_t queue[CHUNK_BH_QUEUE];
        sy_rwlock_t lock;
        uint64_t count;
        int threads;
} bh_queue_t;

typedef struct {
        chkid_t id;
        chkid_t _parent;
        chkid_t *parent;
        nid_t source;
        int op;
        int type;
        int localize;
        int dist_count;
        nid_t dist[LICH_REPLICA_MAX];
} entry_t;

static bh_queue_t bh_queue;
static token_bucket_t __localize_bucket;
static rate_probe_t __localize_rate;

static time_t __last_dump;
static sy_rwlock_t __dump_lock__;

static uint32_t __key_from_int(const void *i)
{
        return ((chkid_t *)i)->id;
}

static int __equal(const void *key, const void *data)
{
        const chkid_t *id = key;
        const squeue_entry_t *sent = data;
        entry_t *ent;

        ent = sent->ent;

        return !chkid_cmp(id, &ent->id);
}

static int __chunk_bh_sync__(const chkid_t *parent, const chkid_t *chkid)
{
        int ret;

        if (parent->type == __VOLUME_CHUNK__) {
                ret = volume_ctl_chunk_sync(parent, chkid, 0, NULL);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else if (chkid->type == __VOLUME_CHUNK__) {
                ret = volume_ctl_chunk_sync(chkid, chkid, 0, NULL);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = pool_ctl_chunk_sync(parent, chkid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_bh_sync(const chkid_t *_parent, const chkid_t *chkid)
{
        int ret, retry = 0;
        chkid_t parent;

        if (chkid->type == __VOLUME_CHUNK__ || chkid->type == __POOL_CHUNK__) {
                parent = *chkid;
        } else if (_parent == NULL) {
                if (chkid->type == __VOLUME_SUB_CHUNK__) {
                        parent = *chkid;
                        parent.type = __VOLUME_CHUNK__;
                        parent.idx = 0;
                } else if (chkid->type == __POOL_SUB_CHUNK__) {
                        parent = *chkid;
                        parent.type = __POOL_CHUNK__;
                        parent.idx = 0;
                } else {
                        ret = md_parent_get(chkid, &parent);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        _parent = &parent;
                }
        } else {
                parent = *_parent;
        }

retry:
        ret = __chunk_bh_sync__(&parent, chkid);
        if (unlikely(ret)) {
                if (ret == EAGAIN || ret == ENOSPC) {
                        if (retry > 20) {
                                GOTO(err_ret, ret);
                        }

                        DINFO("chunk "CHKID_FORMAT" sync, retry %u\n",
                              CHKID_ARG(chkid), retry);
                        retry++;
                        usleep(100 * 1000);
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


static int __chunk_bh_localize(const chkid_t *parent, const chkid_t *chkid)
{
        int ret;

        YASSERT(chkid->type == __RAW_CHUNK__);
        YASSERT(parent);

        DBUG("chunk "CHKID_FORMAT" localize\n",
              CHKID_ARG(chkid));

        ret = volume_ctl_localize(parent, chkid->idx);
        if (unlikely(ret)) {
                DWARN("chunk "CHKID_FORMAT" localize fail %u, %s\n",
                      CHKID_ARG(chkid), ret, strerror(ret));
        }

        return 0;
}

typedef struct {
        entry_t *ent;
        int retval;
} core_ctx_t;

static int __chunk_bh_exec__(va_list ap)
{
        int ret;
        entry_t *ent = va_arg(ap, entry_t *);

        va_end(ap);

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk_bh_exec op %d "CHKID_FORMAT"/"CHKID_FORMAT"\n", ent->op, CHKID_ARG(ent->parent), CHKID_ARG(&ent->id));
#else
        DBUG("chunk_bh_exec op %d "CHKID_FORMAT"/"CHKID_FORMAT"\n", ent->op, CHKID_ARG(ent->parent), CHKID_ARG(&ent->id));
#endif

        if (ent->op & CHUNK_OP_SYNC) {
                ret = __chunk_bh_sync(ent->parent, &ent->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (ent->op & CHUNK_OP_LOCALIZE) {
                ret = __chunk_bh_localize(ent->parent, &ent->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_bh_exec(entry_t *ent)
{
        int ret;

        if (ent->parent->type == __VOLUME_CHUNK__) {
                ret = core_request(core_hash(ent->parent), -1, "chunk_bh",
                                   __chunk_bh_exec__, ent);
                if (unlikely(ret)) {
                        DBUG("chunk "CHKID_FORMAT" op %d fail ret:%d\n", CHKID_ARG(&ent->id), ent->op, ret);
                        GOTO(err_ret, ret);
                }
        } else {
                if (ent->op & CHUNK_OP_SYNC) {
                        ret = __chunk_bh_sync(ent->parent, &ent->id);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                if (ent->op & CHUNK_OP_LOCALIZE) {
                        ret = __chunk_bh_localize(ent->parent, &ent->id);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_bh_destory_task(queue_t *queue, chkid_t *chkid)
{
        int ret;
        entry_t *ent;

        ret = sy_rwlock_wrlock(&bh_queue.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_remove(&queue->queue, chkid, (void **)&ent);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        YASSERT(chkid_cmp(chkid, &ent->id) == 0);

        bh_queue.count--;
        queue->task_count--;
        queue->task_type[ent->type]--;
        yfree((void **)&ent);

        sy_rwlock_unlock(&bh_queue.lock);

        return 0;
err_ret:
        return ret;
}

static int __chunk_bh_yield_task(queue_t *queue, chkid_t *chkid)
{
        int ret;

        ret = sy_rwlock_wrlock(&bh_queue.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_move_tail(&queue->queue, chkid);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        sy_rwlock_unlock(&bh_queue.lock);

        if (queue->task_count <= queue->task_type[CHUNK_BH_LOCALIZE]) {
                DBUG("chunk localize, token not enough!\n");
                sy_msleep(100);
        }

        return 0;
err_ret:
        return ret;
}

static void __chunk_bh_info_dump()
{
        int i, ret;
        uint64_t rate;
        queue_t *queue;
        char value[MAX_BUF_LEN], tmp[MAX_BUF_LEN];

        time_t now = gettime();
        if (now - __last_dump < 2 && bh_queue.count > 0)
                return;

        __last_dump = now;

        rate_probe_detect(&__localize_rate, &rate);
        snprintf(value, MAX_BUF_LEN,"chunk_bh info:\n"
                        "queue_count:%d\n"
                        "count:%llu\n"
                        "threads:%d\n"
                        "fill_rate:%f\n"
                        "rate:%llu\n",
                        CHUNK_BH_QUEUE,
                        (LLU)bh_queue.count,
                        bh_queue.threads,
                        __localize_bucket.rate,
                        (LLU)rate);

        for (i=0; i<CHUNK_BH_QUEUE; i++) {
                queue = &bh_queue.queue[i];
                snprintf(tmp, MAX_BUF_LEN,"queue%d:\n"
                                "count:%llu\n"
                                "sync:%llu\n"
                                "localize:%llu\n",
                                i, (LLU)queue->task_count,
                                (LLU)queue->task_type[CHUNK_BH_SYNC],
                                (LLU)queue->task_type[CHUNK_BH_LOCALIZE]);
                strcat(value, tmp);
        }

        ret = sy_rwlock_wrlock(&__dump_lock__);
        if (unlikely(ret))
                return;

        ret = nodectl_set(CHUNK_BH_INFO, value);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        sy_rwlock_unlock(&__dump_lock__);
}

static int __chunk_bh_worker(void *arg)
{
        int ret, is_ready;
        entry_t *ent;
        queue_t *queue;
        queue = arg;

        while (1) {
                ret = sy_rwlock_wrlock(&bh_queue.lock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = squeue_getfirst(&queue->queue, (void **)&ent);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                sy_rwlock_unlock(&bh_queue.lock);
                                ent = NULL;
                                break;
                        } else
                                UNIMPLEMENTED(__DUMP__);
                }

                bh_queue.threads++;

                DBUG("threads %u queue %llu\n", bh_queue.threads,
                      (LLU)bh_queue.count);

                sy_rwlock_unlock(&bh_queue.lock);

                if (ent->op & CHUNK_OP_LOCALIZE) {
                        token_bucket_consume(&__localize_bucket, 1, &is_ready, NULL);
                        if (!is_ready) {
                                __chunk_bh_yield_task(queue, &ent->id);
                                bh_queue.threads--;
                                continue;
                        } else {
                                rate_probe_increase(&__localize_rate, 1);
                        }
                }
                __chunk_bh_exec(ent);
                __chunk_bh_destory_task(queue, &ent->id);
                __chunk_bh_info_dump();
                bh_queue.threads--;
        }

        return 0;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return ret;
}

int chunk_bh_localize(const chkid_t *parent, const chkid_t *chkid)
{
        int ret;
        entry_t *ent;
        queue_t *queue;

        DBUG("localize chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        queue = &bh_queue.queue[(chkid->id) % CHUNK_BH_QUEUE];

        ret = sy_rwlock_wrlock(&bh_queue.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (bh_queue.count > CHUNK_BH_CAPACITY) {
                DBUG("chunk bh queue overflow %d, localize req while cast off\n",
                                CHUNK_BH_CAPACITY);
                sy_rwlock_unlock(&bh_queue.lock);
                return 0;
        }

        ret = squeue_get(&queue->queue, chkid, (void **)&ent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = ymalloc((void**)&ent, sizeof(*ent));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        YASSERT(parent);
                        ent->id = *chkid;
                        ent->_parent = *parent;
                        ent->parent = &ent->_parent;
                        ent->op = CHUNK_OP_LOCALIZE;
                        ent->type = CHUNK_BH_LOCALIZE;

                        ret = squeue_insert(&queue->queue, &ent->id, ent, 0);
                        if (unlikely(ret))
                                GOTO(err_free, ret);

                        worker_post(&queue->sem);

                        bh_queue.count++;
                        queue->task_count++;
                        queue->task_type[ent->type]++;
                } else
                        GOTO(err_lock, ret);
        } else {
                if (ent->op == CHUNK_OP_LOCALIZE) {
                        DBUG("localize chunk "CHKID_FORMAT", exist\n", CHKID_ARG(chkid));
                } else {
                        DWARN("localize chunk "CHKID_FORMAT" busy, op %u\n", CHKID_ARG(chkid), ent->op);
                        ret = EBUSY;
                        GOTO(err_lock, ret);
                }
        }

        sy_rwlock_unlock(&bh_queue.lock);

        return 0;
err_free:
        yfree((void **)&ent);
err_lock:
        sy_rwlock_unlock(&bh_queue.lock);
err_ret:
        return ret;
}

int chunk_bh_sync(const chkid_t *parent, const chkid_t *chkid, int priority)
{
        int ret;
        entry_t *ent;
        queue_t *queue;

        return 0;
        
#if ENABLE_CHUNK_DEBUG
        DINFO("sync chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
#else
        DBUG("sync chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
#endif

        queue = &bh_queue.queue[(chkid->id) % CHUNK_BH_QUEUE];

        ret = sy_rwlock_wrlock(&bh_queue.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_get(&queue->queue, chkid, (void **)&ent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = ymalloc((void**)&ent, sizeof(*ent));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        memset(ent, 0x0, sizeof(*ent));

                        ent->id = *chkid;
                        if (parent) {
                                ent->_parent = *parent;
                                ent->parent = &ent->_parent;
                        }  else {
                                ent->parent = NULL;
                        }

                        ent->op = CHUNK_OP_SYNC;
                        ent->type = CHUNK_BH_SYNC;

                        ret = squeue_insert(&queue->queue, &ent->id, ent, priority);
                        if (unlikely(ret))
                                GOTO(err_free, ret);

                        worker_post(&queue->sem);

                        bh_queue.count++;
                        queue->task_count++;
                        queue->task_type[ent->type]++;
                } else
                        GOTO(err_lock, ret);
        } else {
                DBUG("chunk "CHKID_FORMAT" in queue\n", CHKID_ARG(chkid));
                if ((ent->op & CHUNK_OP_SYNC) == 0) {
                        ent->op |= CHUNK_OP_SYNC;
                }
        }

        DBUG("sync chunk "CHKID_FORMAT", op: %d\n",
                        CHKID_ARG(chkid), ent->op);

        sy_rwlock_unlock(&bh_queue.lock);

        return 0;
err_free:
        yfree((void **)&ent);
err_lock:
        sy_rwlock_unlock(&bh_queue.lock);
err_ret:
        return ret;
}

static void __config_get_fill_rate(int *_fill_rate)
{
        int fill_rate;

        fill_rate = nodectl_get_int(CHUNK_BH_LOCALIZE_RATE, CHUNK_BH_LOCALIZE_DF_RATE);
        if (fill_rate <= 0) {
                fill_rate = 1;
        }

        token_bucket_set(&__localize_bucket, "chunk_bh", fill_rate, fill_rate, fill_rate, 0, 0);

        if (_fill_rate) {
                *_fill_rate = fill_rate;
        }
}

int __localize_fill_rate(void *context, uint32_t mask)
{
        int fill_rate;

        (void) context;
        (void) mask;

        __config_get_fill_rate(&fill_rate);
        __chunk_bh_info_dump();
        DINFO("chunk localize fill_rate %d\n", fill_rate);

        return 0;
}

int __localize_fill_rate_reset(void *context, uint32_t mask)
{
        int ret;
        char def_rate[24];

        (void) context;
        (void) mask;

        sprintf(def_rate,"%d", CHUNK_BH_LOCALIZE_DF_RATE);
        ret = nodectl_register(CHUNK_BH_LOCALIZE_RATE, def_rate,
                        __localize_fill_rate, __localize_fill_rate_reset, NULL);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        token_bucket_set(&__localize_bucket, "chunk_bh", CHUNK_BH_LOCALIZE_DF_RATE,
                        CHUNK_BH_LOCALIZE_DF_RATE, CHUNK_BH_LOCALIZE_DF_RATE, 0, 0);
        DBUG("chunk localize fill_rate reset %d\n", CHUNK_BH_LOCALIZE_DF_RATE);

        return 0;
}

int chunk_bh_init()
{
        int ret, i;
        queue_t *queue;

        bh_queue.count = 0;
        bh_queue.threads = 0;

        ret = sy_rwlock_init(&bh_queue.lock, "chunk_bh.queue");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_init(&__dump_lock__, "chunk_bh.dump");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        token_bucket_init(&__localize_bucket, "chunk_bh", CHUNK_BH_LOCALIZE_DF_RATE,
                        CHUNK_BH_LOCALIZE_DF_RATE, CHUNK_BH_LOCALIZE_DF_RATE, 0, 0);
        rate_probe_init(&__localize_rate, 1);

        __config_get_fill_rate(NULL);

#if 0
        // TODO many fnotify_register
        char def_rate[MAX_BUF_LEN];
        sprintf(def_rate,"%d",CHUNK_BH_LOCALIZE_DF_RATE);
        ret = nodectl_register(CHUNK_BH_LOCALIZE_RATE, def_rate,
                        __localize_fill_rate, __localize_fill_rate_reset, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        for (i = 0; i < CHUNK_BH_QUEUE; i++) {
                queue = &bh_queue.queue[i];

                ret = squeue_init(&queue->queue, 1024, __equal, __key_from_int);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = worker_create(&queue->sem, "chunk_bh", __chunk_bh_worker,
                                    NULL, queue, WORKER_TYPE_SEM, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
